Java 读写锁 - ReentrantReadWriteLock

前言

关于java的读写锁 ReentrantReadWriteLock,在jdk1.8之后引入了 StampedLock。

ReentrantReadWriteLock

ReentrantReadWriteLock是基于AQS同步队列实现的读写锁,其允许读操作之间不阻塞,写操作与写操作、读操作都是阻塞的,即如果要获取写锁,必须保证写锁和读锁都没有被占用。ReentrantReadWriteLock的读锁是不能被升级为写锁,但是写锁是可以降为读锁。

使用实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class Count {
private final List<Data> list = new ArrayList<>();
private final ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
//获取读锁
private final Lock r = rw.readLock();
//获取写锁
private final Lock w = rw.writeLock();

public Data get(int index) {
r.lock();
try { return list.get(index); }
finally { r.unlock(); }
}
public List getAll() {
r.lock();
try { return Collections.unmodifiableList(list); }
finally { r.unlock(); }
}
public Data set(int index, Data value) {
w.lock();
try { return list.set(index, value); }
finally { w.unlock(); }
}
public void add(Data value) {
w.lock();
try { list.add(value); }
finally { w.unlock(); }
}
}

官方实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class CachedData {
Object data;
volatile boolean cacheValid;
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

void processCachedData() {
rwl.readLock().lock();
if (!cacheValid) {
// 在获取写锁之前必须释放读锁,因为写锁和任何其他写锁和所有读锁互斥
// 读锁不能升级为写锁
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
// Recheck state because another thread might have
// acquired write lock and changed state before we did.
if (!cacheValid) {
data = ...
cacheValid = true;
}
// 写锁可以降级为读锁(持有写锁的线程可以读锁)
rwl.readLock().lock();
} finally {
rwl.writeLock().unlock(); // 释放写锁,但还是持有读锁
}
}

try {
use(data);
} finally {
rwl.readLock().unlock();
}
}
}

部分源码分析

接下来分析其获取读写锁和释放读写锁的源码分析,如果看过AQS同步队列和ReentrantLock的源码,那么读接下来的代码会很轻松,AQS同步队列和ReentrantLock的知识不再做介绍了。ReentrantReadWriteLock的读锁对应于AQS中的共享锁,而写锁对应AQS中的独占锁。

同ReentrantLock相同,ReentrantReadWriteLock也是使用内部类Sync继承了AQS,通过继承Sync实现了公平锁和非公平锁。读写锁可以使用这两种方式获取锁,默认为非公平锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 6317671515068378041L;


/*
* Shared代表读,exclusive代表写,AQS的state中的前16位代表读锁占有的线程个数,后16位代表写锁占有线程个数
* 假设存储的数字为c, 如果想得到占有读锁的个数,c >>> 16, 如果想得到占有写锁的个数,c & ((1 << 16) - 1)
*/
static final int SHARED_SHIFT = 16;
// 读锁占有次数的加1的单位
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
// 锁最大持有次数
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

// 传入的参数一般为state,返回获得读锁的次数
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
// 传入的参数一般为state,返回获得写锁的次数
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

// 记录每个线程读锁重入的次数
static final class HoldCounter {
int count = 0;
// Use id, not reference, to avoid garbage retention
final long tid = getThreadId(Thread.currentThread());
}

static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}

// 各个线程对于一个HoldCounter,记录了每个线程读锁重入的次数
private transient ThreadLocalHoldCounter readHolds;

// 最近一个获取读锁的线程的HoldCounter
private transient HoldCounter cachedHoldCounter;

// 第一个获取读锁的线程
private transient Thread firstReader = null;
// 第一个获取读锁的线程重入的次数
private transient int firstReaderHoldCount;

Sync() {
readHolds = new ThreadLocalHoldCounter();
setState(getState()); // ensures visibility of readHolds
}

...
}


// 非公平锁
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock() {
return false; // writers can always barge
}
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
}

// 公平锁
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}

写锁的获取

写锁的获取是直接调用了AQS的acquire方法,AQS使用了模板方法,实际是通过Sync中的tryAcquire的方法判断获取写锁是否成功

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 1. 如果读的线程或写的线程不为0,且占有锁的线程不是当期线程,失败
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 2. 如果重入的次数大于MAX_COUNT,失败,抛出异常
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
* 3. 否则,该线程可以获取锁,更新state的状态
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
// c!=0 代表必定有线程占有读锁或者写锁
if (c != 0) {
// w==0说明有线程占有读锁,无法获得,如果w!=0,不是占有当前写锁的线程不能获得锁
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 重入,当前线程已经获得写锁,无需setExclusiveOwnerThread(current);
setState(c + acquires);
return true;
}
//writerShouldBlock在公平锁中需要判断前驱节点无等待获取锁返回false,非公平锁直接返回false
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
// 到达这说明无读写锁的占有,且符合(公平或非公平)锁的获取锁的策略,获得锁
setExclusiveOwnerThread(current);
return true;
}

在c != 0的情况下,说明读锁或者写锁其中一个或者两个被线程占有,当w==0说明,有线程占有读锁,这时就算当前获取锁的线程就是占有读锁的线程,也会返回false,只有w!=0(即当前有线程占有写锁),并且该线程是就是当前获取锁的线程才允许获取写锁。这说明线程的读锁是无法升级成写锁的,写锁和所有的读锁阻塞,即使是自身占有写锁,而写锁是支持重入的。

写锁的释放

同样,写锁的释放是直接调用了AQS的release方法,AQS使用了模板方法,实际是通过Sync中的tryRelease的方法判断释放写锁是否成功

1
2
3
4
5
6
7
8
9
10
11
12
13
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 获得写锁的次数减1
int nextc = getState() - releases;
// 如果为0,表示没有线程占有
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
// 重新设置state的值
setState(nextc);
return free;
}

每次释放锁,获得写锁的次数减1,如果为0,则释放写锁,这ReentrantLock的可重入方法实相同

读锁的获取

读锁的获取是直接调用了AQS的acquireShared方法,AQS使用了模板方法,实际是通过Sync中的tryAcquireShared的方法判断获取写锁是否成功

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 1. 如果写锁被另一个线程占有,失败
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 2. 根据锁获取的策略判断是否需要阻塞,(对于公平锁来说如果等待队列中还有线程,那个必定是获取写锁的,必须阻塞,在等待队列中排在其后面,对于非公平锁来说,会有一个启发式的算法,防止线程一直获取不到写锁,处于饥饿状态)
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
* 3. 如果第2步失败,可能是CAS失败,或者其他线程占有写锁,交给fullTryAcquireShared处理
*/
Thread current = Thread.currentThread();
int c = getState();
// 如果写锁被其他线程持有,返回-1
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
// 判断是否需要阻塞,读锁占有次数释放大于MAX_COUNT,CAS是否成功
// if成功则说明该线程已经获取锁
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
// r==0 为第一个持有读锁的线程,
// 第一个线程的HoldCount不记录在ThreadLocalHoldCounter中
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
// 如果当前线程不是最新持有读锁的线程,更新cachedHoldCounter
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
// 读锁持有数量加1
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}


// 非公平锁
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock() {
return false; // writers can always barge
}
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
}

// 公平锁
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}


/*
* 如果持有锁的线程不为空,且后继节点不为空,并且是独占锁,则返回true,在ReentrantReadWriteLock防止中防止获取写锁的线程永远得不到锁,造成饥饿现象
*/
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}

在获取读锁的过程中是不需要判断重入的,获取读锁的过程中首先需要判断有写锁的占用,接着需要判读是否需要阻塞:在公平锁下,还需要保证等待队列中不存在等待的线程,如果有则说明其中必定有获取写锁的进程存在,则当前获取读锁的线程就需要阻塞;在非公平锁下,为了防止造成饥饿现象,如果下一个线程是获取写锁的进程,则需要阻塞,否则可以直接获得读锁。最后获取锁失败则调用fullTryAcquireShared方法,其中要处理将写锁降级为读锁的可能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
final int fullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
HoldCounter rh = null;
for (;;) {
int c = getState();
// 如果有线程占有写锁,且不是当前线程,返回-1
if (exclusiveCount(c) != 0) {
// 如果是当前线程获得写锁,则读锁是可以获得的,这称为写锁降为读锁
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// CAS增加读锁个数,同tryAcquireShared中的代码
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}

读锁的释放

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// 释放的锁是firstReader
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
// 将对应线程的读锁获取次数减1
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
// CAS设置新的state
if (compareAndSetState(c, nextc))
// 如果为0,说明读锁释放完成
return nextc == 0;
}
}

注意

  • 写锁可以降为读锁,即一个线程A获得写锁,其他线程是无法获得读锁的,但是A线程可以获得,写锁降级为读锁不需要判断等待队列是否有写锁在,不用遵循公平锁和非公平锁的原则
  • 读锁是不用判断可重入的(没有必要),写锁是需要判断的

  • 写锁是和所有的读锁互斥的,包括获取写锁的线程正在占有读锁

0%